feat(hybridcloud): async apigateway#111307
Conversation
6a7fe9d to
cd37f83
Compare
Backend Test FailuresFailures on
|
e267d48 to
450d384
Compare
Backend Test FailuresFailures on
|
Backend Test FailuresFailures on
|
Backend Test FailuresFailures on
|
Backend Test FailuresFailures on
|
Backend Test FailuresFailures on
|
1614e3c to
6afbe1c
Compare
Backend Test FailuresFailures on
|
6afbe1c to
0b43e92
Compare
7e7d8e5 to
c9293a5
Compare
| # so that responses aren't modified after Content-Length is set, or have the | ||
| # response modifying middleware reset the Content-Length header. | ||
| # This is because CommonMiddleware Sets the Content-Length header for non-streaming responses. | ||
| APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "").lower() in ("1", "true", "y", "yes") |
There was a problem hiding this comment.
| APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "").lower() in ("1", "true", "y", "yes") | |
| APIGW_ASYNC = os.environ.get("SENTRY_ASYNC_APIGW", "") == "1" |
Not blocking, but I usually would do the simplest thing that could work.
There was a problem hiding this comment.
We don't have a strict rule about "boolean" env vars across the org. I prefer to just map every possible case rather than dealing with the headaches of debugging why something doesn't work as expected because someone might have set the value to true rather than 1.
| self.concurrency = concurrency | ||
| self.counter_window = failures[0] | ||
| self.failures = failures[1] | ||
| self.semaphore = asyncio.Semaphore(self.concurrency) |
There was a problem hiding this comment.
If we're running multiple replicas there doesn't seem to be a way to circuit break across the deployment, each replica will have to figure out that there is a problem independently. I'm guessing you went with this approach because our existing circuit breakers use sync-redis?
There was a problem hiding this comment.
Mainly for that reason, yes.
But also, I don't see that much of a value in having the circuit breaking data shared across the whole deployment (but also the next deployment when we release stuff). Having each worker (not the whole pod) with its own state IMO allows us to configure more granular limits, and avoid conditions in which we overflow the concurrency circuit because of scaling and the amount of running control pods. There might also be conditions in which a single pod is failing to connect to the upstream for $REASONS (eg: machine specific temporary network issues) and I don't think we want that single pod to overload the circuit for everything else.
There was a problem hiding this comment.
That's all fair. We'll be able to use metrics to see how many gateways instances have breakers open as well, and observe how coherent that state is too.
| @contextmanager | ||
| def mock_proxy_client(router: HttpxMockRouter): | ||
| """Patch the proxy_client with a mock httpx.AsyncClient using the given router.""" | ||
| mock_client = httpx.AsyncClient(transport=httpx.MockTransport(router.handler)) | ||
| with patch("sentry.hybridcloud.apigateway_async.proxy.proxy_client", mock_client): | ||
| yield mock_client |
There was a problem hiding this comment.
Unfortunate that there isn't a library like responses for httpx 😢
| from asgiref.sync import sync_to_async | ||
| from django.test.client import Client | ||
|
|
||
| class MockedProxy: |
There was a problem hiding this comment.
Do the tests run with async mode?
There was a problem hiding this comment.
Yes, but given our tests run with sync Django's test client, the code path actually used is the async_to_sync one in the middleware.
The only tests that run with ASGI are the ones using the liveserver with control silo, as that server should launch following the devserver configuration, which is set to ASGI for control.
| # FIXME: when in ASGI, the call to `options.store` from `in_random_rollout` | ||
| # would fail, because of SyncOnlyOperation. |
There was a problem hiding this comment.
This is going to be a big footgun if we can't use options in the async context.
There was a problem hiding this comment.
We can, as soon it's inside a sync_to_async block.
AFAICT this is the only code-section which is problematic, it doesn't invalidate the usage of options.store in ASGI. But, we might need to review more middlewares and add an async-specific flow. Hard to tell right now without further live testing.
…ace of `threading.local`
Co-authored-by: Armen Zambrano G. <armenzg@users.noreply.github.com> Applied via @cursor push command
edf41aa to
e1951af
Compare
| assert request.method is not None | ||
| query_params = request.GET | ||
|
|
||
| timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) |
There was a problem hiding this comment.
Bug: The async proxy removes runtime timeout configuration. The default GATEWAY_PROXY_TIMEOUT is None, which can cause requests to hang indefinitely for most endpoints.
Severity: HIGH
Suggested Fix
Restore the previous timeout logic by first checking options.get("apigateway.proxy.timeout"), then falling back to settings.GATEWAY_PROXY_TIMEOUT, and finally applying endpoint-specific overrides. Alternatively, set a reasonable default timeout instead of None.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/sentry/hybridcloud/apigateway_async/proxy.py#L163
Potential issue: The new async proxy implementation removes the check for the runtime
option `options.get("apigateway.proxy.timeout")`. The fallback,
`settings.GATEWAY_PROXY_TIMEOUT`, is `None`, which disables timeouts for the `httpx`
client. Since only a few endpoints have explicit overrides in
`ENDPOINT_TIMEOUT_OVERRIDE`, most API gateway requests will have no timeout. This can
cause requests to hang indefinitely, potentially exhausting the connection pool and
defeating the purpose of the async refactor.
Did we get this right? 👍 / 👎 to inform future reviews.
| proxy_client = httpx.AsyncClient() | ||
| circuitbreakers = CircuitBreakerManager() |
There was a problem hiding this comment.
Bug: The module-level CircuitBreakerManager creates asyncio.Semaphore instances that are not thread-safe, causing a RuntimeError when accessed from different worker threads in a multi-threaded server setup.
Severity: HIGH
Suggested Fix
The CircuitBreakerManager instance should not be a module-level singleton. It should be instantiated within a context that ensures each worker thread or process gets its own instance, preventing cross-thread access to event loop-bound objects.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: src/sentry/hybridcloud/apigateway_async/proxy.py#L44-L45
Potential issue: A module-level `CircuitBreakerManager` is initialized, which is shared
across all worker threads. The manager lazily creates `asyncio.Semaphore` instances,
which become bound to the event loop of the thread that first accesses them. In a
multi-threaded environment (like Granian with Python 3.13+), if another thread tries to
access the same circuit breaker, it will receive a semaphore bound to a different event
loop. This will raise a `RuntimeError: "Task got Future attached to a different loop"`,
causing a runtime crash.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
|
|
||
| def window_overflow(self) -> bool: | ||
| self._maybe_counter_flip() | ||
| return self._counters[self._counter_idx] > self.failures |
There was a problem hiding this comment.
Circuit breaker off-by-one in failure threshold check
Low Severity
The window_overflow method uses > instead of >= when comparing the failure counter against self.failures (sourced from APIGATEWAY_PROXY_MAX_FAILURES). This means if the max is configured to 100, the circuit breaker actually allows 101 failures before tripping. Given the setting name implies an upper bound, >= would match the intended semantics.
| # for the moment let's just simplify and skip this entirely. | ||
| asyncio.get_running_loop() | ||
| return None | ||
| except Exception: |
There was a problem hiding this comment.
Overly broad exception catch masks potential errors
Low Severity
asyncio.get_running_loop() only raises RuntimeError when no event loop is running, but the handler catches bare Exception. This silently swallows any unexpected error that might occur in the try block, making future bugs in this area harder to diagnose. Using except RuntimeError would correctly express the intent.
markstory
left a comment
There was a problem hiding this comment.
Approving so that we can test this out further with a small amount of live traffic to see if we should continue to figure out how to run this long term.
| assert request.method is not None | ||
| query_params = request.GET | ||
|
|
||
| timeout = ENDPOINT_TIMEOUT_OVERRIDE.get(url_name, settings.GATEWAY_PROXY_TIMEOUT) |
There was a problem hiding this comment.
We should use the apigateway.proxy.timeout option as a fallback instead of the setting as we don't have any values for the setting in production.
There was a problem hiding this comment.
I will address this in the next steps, calling into options here might be a bad idea (blocking code) and we might instead start using the setting in production. It needs proper testing.
| url_name: str, | ||
| ) -> HttpResponseBase: | ||
| """Take a django request object and proxy it to a cell silo""" | ||
| metric_tags = {"region": cell.name, "url_name": url_name} |
There was a problem hiding this comment.
Should we have a metric tag that lets us know metric values are coming from the async gateway and not the sync one? right now I don't think we'd be able to tell the two apart other than by pod-names.
There was a problem hiding this comment.
Gonna address this in a following PR, for now the pod name should be good, as we'll run this in a single pod.
This changes the `apigateway` proxy to be async, with the idea to serve the relevant deployment of control silo in ASGI rather than WSGI. The rationale here is to avoid situations in which we exhaust the server's threadpool by just waiting for apigateway requests to complete, as we saw in INCs 2054/2056. **Note:** the APIGateway changes are gated into a separated Python module, the async flow is enabled through `SENTRY_ASYNC_APIGW` environment variable. This allows us to control the rollout of the change in prod. Tests and local devserver are instead always using the new code. Detailed changes: - [x] Make APIGateway proxy `async`, switching inner client impl from `requests` to `httpx` - [x] Change APIGateway middleware to work both in ASGI and WSGI contexts (with the latter using `async_to_sync`) - [x] Update relevant tests interacting with APIGateway - [x] Fix proxy acceptance test - <s> Fix ORM calls in the custom SDK integration</s> - [x] Bypass ORM calls in SDK custom logging integration - [x] Restore/adapt circuit brakers


This changes the
apigatewayproxy to be async, with the idea to serve the relevant deployment of control silo in ASGI rather than WSGI.The rationale here is to avoid situations in which we exhaust the server's threadpool by just waiting for apigateway requests to complete, as we saw in INCs 2054/2056.
Note: the APIGateway changes are gated into a separated Python module, the async flow is enabled through
SENTRY_ASYNC_APIGWenvironment variable. This allows us to control the rollout of the change in prod. Tests and local devserver are instead always using the new code.Detailed changes:
async, switching inner client impl fromrequeststohttpxasync_to_sync)Fix ORM calls in the custom SDK integration